-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
support unixfs dag in mater #728
support unixfs dag in mater #728
Conversation
d6ab6b4
to
fe052bb
Compare
cdbc77a
to
3edb9d5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't audit the balanced streams, I'll leave that for the final review.
sizes.push(link_info.raw_data_length); | ||
links.push(PbLink { | ||
cid: *child_cid, | ||
name: Some("".to_string()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't it be None
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ideally, but now if we use None
some tests will fail
failures:
stores::blockstore::tests::byte_eq_spaceglenda
stores::blockstore::tests::dedup_lorem_roundtrip
stores::filestore::test::test_spaceglenda_roundtrip
v2::writer::tests::full_spaceglenda
it's cause changing the link’s name from Some("") to None affects the serialized output. In our DAG‑PB the “Name” field is encoded as a field in the protobuf message. When you set it to Some(""), even though the string is empty, the encoder still writes the tag and a length (which will be 0). In contrast, if you set it to None, that field is omitted entirely. This difference causes the final serialized CAR file to have a slightly different size (and different offsets for index entries) compared to the expected reference produced by go‑car or as specified by our tests.
Since our tests compare exact byte lengths and offsets (for example, expecting 1358 bytes versus 1350 bytes, etc.), the change leads to mismatches. In this case, if the expected behavior (or the spec) requires that an empty name is explicitly encoded as an empty string, you need to keep using Some("").
“no name” and “empty name” are not equivalent, in the encoding they differ. The tests check for the exact binary output, so we gotta keep the field present as Some("") if you we to match the expected output.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remember now. Can you leave a comment explaining the edge case? Since we're looking to match go-car's impl, etc
9a9746a
to
208c554
Compare
208c554
to
636873a
Compare
raw, | ||
); | ||
|
||
let cid = convert_file_to_car(&input_path, &output_path, config, false).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We seem to not overwrite by default now. If we do not want to support overwriting can we remove this boolean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's just not remove it the overwrite flag at all, what isn't broken doesn't need fixing :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we are supporting (see #728 (comment))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overwrite flag was useful in our expriments, I'd let it be.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Publishing a partial review.
@@ -23,6 +24,8 @@ use tokio_util::{ | |||
use tower_http::trace::TraceLayer; | |||
use uuid::Uuid; | |||
|
|||
type BoxedStream = Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send>>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tracing::error!(%err, "failed to execute blocking task"); | ||
(StatusCode::INTERNAL_SERVER_ERROR, err.to_string()) | ||
})??; | ||
|
||
// Branching needed here since the resulting `StreamReader`s don't have the same type | ||
// Determine how to obtain the file's bytes: | ||
let file_cid = if request.headers().contains_key("Content-Type") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is the biggest in the upload function. I am trying to understand what it actually changes, but I can't see the reason why the custom stream is needed. Field
already implements the Stream
trait.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hadn't noticed this before. Why is a branch changed and not the other? Currently, this code is working why move to a custom made stream?
/// Reads bytes from the source and writes them to a CAR file. | ||
/// Converts a source stream into a CARv2 file and writes it to an output stream. | ||
/// | ||
/// Send + 'static bounds are required because the UnixFS processing involves: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The bounds descriptions are not needed in the docs
|
||
/// Catch-all error for miscellaneous cases. | ||
#[error("other error: {0}")] | ||
Other(String), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This Other
variant is used only once. Instead of using Other
you should use CidError
or InvalidCid
and remove this variant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sorry but I can't allow this PR to go through with all the seemingly unrelated refactors.
Some of them are even clear downgrades, like removing logging from unhandled errors.
tracing::error!(%err, path = %file_path.display(), "failed to remove uploaded piece"); | ||
} | ||
|
||
let _ = tokio::fs::remove_file(&file_path).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're removing the log and not returning the error.
Silently failing and leaving the user in the dark IS NOT OK
.await | ||
.map_err(|err| { | ||
tracing::error!(%err, "failed to rename the CAR file"); | ||
(StatusCode::INTERNAL_SERVER_ERROR, err.to_string()) | ||
}) | ||
.await?; | ||
})?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason the await
is better here?
// We need to rename the file since the original storage name is based on the whole deal proposal CID, | ||
// however, the piece is stored based on its piece_cid |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand why this is being removed
// Calculate the piece commitment in the blocking thread pool since `calculate_piece_commitment` | ||
// is CPU intensive — i.e. blocking — potentially improvement is to move this completely out of | ||
// the tokio runtime into an OS thread |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Recurring theme: why are you removing comments that explain the context behind the decisions?
let _ = tokio::fs::remove_file(&file_path).await.inspect_err( | ||
|err| tracing::error!(%err, path = %file_path.display(), "failed to delete file"), | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removing the error logs again. Please explain why this would be a good idea
.await | ||
.map_err(|err| { | ||
tracing::error!(%err, "failed to store file into CAR archive"); | ||
(StatusCode::INTERNAL_SERVER_ERROR, err.to_string()) | ||
})? | ||
} else { | ||
// Read the request body into a CAR archive | ||
// For direct uploads, convert the request body into a stream. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is a direct upload?
let deal_cid = cid::Cid::from_str(&cid).map_err(|err| { | ||
tracing::error!(cid, "failed to parse cid"); | ||
(StatusCode::BAD_REQUEST, err.to_string()) | ||
})?; | ||
|
||
// Use deal_db (we need it now, so we clone it) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment adds no value. I don't get it, you're removing contextual comments that provide insight to why things are done the way they are, but add comments like this where it's very obvious what's going on.
let proposed_deal = | ||
// Move the fetch to the blocking pool since the RocksDB API is sync |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More context being removed
let read_bytes = source.read_buf(&mut buf).await?; | ||
trace!(bytes_read = read_bytes, buffer_size = buf.len(), "Buffer read status"); | ||
// EOF but there's still content to yield -> yield it | ||
while buf.len() >= chunk_size { | ||
// The buffer may have a larger capacity than chunk_size due to reserve | ||
// this also means that our read may have read more bytes than we expected, | ||
// thats why we check if the length if bigger than the chunk_size and if so | ||
// we split the buffer to the chunk_size, then freeze and return | ||
let chunk = buf.split_to(chunk_size); | ||
yield chunk.freeze(); | ||
} // otherwise, the buffer is not full, so we don't do a thing | ||
|
||
if read_bytes == 0 && !buf.is_empty() { | ||
let chunk = buf.split(); | ||
yield chunk.freeze(); | ||
break; | ||
} else if read_bytes == 0 { | ||
break; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unless you can explain where the bug was, I expect this changes to be reverted.
I can't allow you to refactor a core piece of logic that is not broken and was the product of multiple discussions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I understand what was going on.
If chunk_size =2
, but you read_bytes == 0, and there is still buf.len() == 4
from the previous invocation, you should not yield it in its entirety, but continue chunking it.
It makes sense, HOWEVER.
Then there is a bug/or it should be panic in here:
if read_bytes == 0 && !buf.is_empty() {
let chunk = buf.split();
yield chunk.freeze();
break;
because if we're chunking, and loop above gave all of the nice sized chunks, why we're yielding something which is not chunk_size
? Is this intended?
Imagine, buf.len() == 5
, chunk_size == 2
, it goes:
yield 2;
yield 2;
yield 1;
so the last yielded chunk won't be a nice chunk.
.....
Then I took a look at: let mut buf = BytesMut::with_capacity(chunk_size);
, on the first look it looks like it's always going to read source.read_buf(&mut buf).await?
, chunk_size into buf. But is it?
Is BytesMut::with_capacity(chunk_size)
guaranteeing that source.read_buf(&mut buf).await?
will read always at most chunk_size
bytes? Quick search told me that it depends on the behaviour of underlying read_buf
, so not sure.
// Use `file_path` here instead of the undefined `piece_path` | ||
let (piece_commitment, _) = commp(&file_path)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nothing else changed. Was there a bug in delia?
raw, | ||
); | ||
|
||
let cid = convert_file_to_car(&input_path, &output_path, config, false).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overwrite flag was useful in our expriments, I'd let it be.
if processed.contains(¤t_cid) { | ||
continue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic is duplicate in the DFS. We're checking it already here.
if !processed.contains(&link.cid) {
to_process.push(link.cid);
}
// Write the raw block data. In a real UnixFS traversal you might need | ||
// to reconstruct file content in order. | ||
output.write_all(&block_bytes).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is that not real UnixFS traversal?
I get confused, what extract_content_via_index
is supposed to do?
What do we mean by index
if actually we are not using index anywhere?
if current_cid.codec() == crate::multicodec::DAG_PB_CODE { | ||
let mut cursor = std::io::Cursor::new(&block_bytes); | ||
// Propagate any error that occurs during decoding. | ||
let pb_node: ipld_dagpb::PbNode = | ||
DagPbCodec::decode(&mut cursor).map_err(Error::DagPbError)?; | ||
for link in pb_node.links { | ||
if !processed.contains(&link.cid) { | ||
to_process.push(link.cid); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if current_cid.codec() == crate::multicodec::DAG_PB_CODE { | |
let mut cursor = std::io::Cursor::new(&block_bytes); | |
// Propagate any error that occurs during decoding. | |
let pb_node: ipld_dagpb::PbNode = | |
DagPbCodec::decode(&mut cursor).map_err(Error::DagPbError)?; | |
for link in pb_node.links { | |
if !processed.contains(&link.cid) { | |
to_process.push(link.cid); | |
} | |
} | |
} | |
if current_cid.codec() == crate::multicodec::DAG_PB_CODE { | |
let mut cursor = std::io::Cursor::new(&block_bytes); | |
// Propagate any error that occurs during decoding. | |
let pb_node: ipld_dagpb::PbNode = | |
DagPbCodec::decode(&mut cursor).map_err(Error::DagPbError)?; | |
for link in pb_node.links { | |
if !processed.contains(&link.cid) { | |
to_process.push(link.cid); | |
} | |
} | |
} else { | |
return Err(Error::UnsupportedCidCodec(current_cid.codec())); | |
} |
impl<R> CarBlockStore<R> | ||
where | ||
R: AsyncSeek + AsyncReadExt + Unpin, | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we pull it in under one impl
block? they seem to be generic over the same trait bounds.
I don't know why one is AsyncSeek
, the other AsyncSeekExt
though.
let read_bytes = source.read_buf(&mut buf).await?; | ||
trace!(bytes_read = read_bytes, buffer_size = buf.len(), "Buffer read status"); | ||
// EOF but there's still content to yield -> yield it | ||
while buf.len() >= chunk_size { | ||
// The buffer may have a larger capacity than chunk_size due to reserve | ||
// this also means that our read may have read more bytes than we expected, | ||
// thats why we check if the length if bigger than the chunk_size and if so | ||
// we split the buffer to the chunk_size, then freeze and return | ||
let chunk = buf.split_to(chunk_size); | ||
yield chunk.freeze(); | ||
} // otherwise, the buffer is not full, so we don't do a thing | ||
|
||
if read_bytes == 0 && !buf.is_empty() { | ||
let chunk = buf.split(); | ||
yield chunk.freeze(); | ||
break; | ||
} else if read_bytes == 0 { | ||
break; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I understand what was going on.
If chunk_size =2
, but you read_bytes == 0, and there is still buf.len() == 4
from the previous invocation, you should not yield it in its entirety, but continue chunking it.
It makes sense, HOWEVER.
Then there is a bug/or it should be panic in here:
if read_bytes == 0 && !buf.is_empty() {
let chunk = buf.split();
yield chunk.freeze();
break;
because if we're chunking, and loop above gave all of the nice sized chunks, why we're yielding something which is not chunk_size
? Is this intended?
Imagine, buf.len() == 5
, chunk_size == 2
, it goes:
yield 2;
yield 2;
yield 1;
so the last yielded chunk won't be a nice chunk.
.....
Then I took a look at: let mut buf = BytesMut::with_capacity(chunk_size);
, on the first look it looks like it's always going to read source.read_buf(&mut buf).await?
, chunk_size into buf. But is it?
Is BytesMut::with_capacity(chunk_size)
guaranteeing that source.read_buf(&mut buf).await?
will read always at most chunk_size
bytes? Quick search told me that it depends on the behaviour of underlying read_buf
, so not sure.
car_v1_start.try_into().unwrap(), | ||
(index_offset - car_v1_start).try_into().unwrap(), | ||
index_offset.try_into().unwrap(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
expects
?
|
||
writer.finish().await?; | ||
|
||
Ok(root.unwrap()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How are we sure that root is not None
?
let chunker = async_stream::try_stream! { | ||
let mut buf = BytesMut::with_capacity(chunk_size); | ||
loop { | ||
let read_bytes = source.read_buf(&mut buf).await?; | ||
while buf.len() >= chunk_size { | ||
let chunk = buf.split_to(chunk_size); | ||
yield chunk.freeze(); | ||
} | ||
|
||
if read_bytes == 0 && !buf.is_empty() { | ||
let chunk = buf.split(); | ||
yield chunk.freeze(); | ||
break; | ||
} else if read_bytes == 0 { | ||
break; | ||
} | ||
} | ||
}; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're reusing the chunkers across two methods, it deserves to be a separate method.
This logic is complex enough not to be duplicated by copy pastying.
Ok(root.unwrap()) | ||
} | ||
|
||
async fn balanced_import_unixfs<Src, Out>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those methods almost look exactly the same, I struggle to find the difference between those two.
- Add docs for this one
- Can we change it's more clear that most of the logic is actually the same? Like extract some methods etc?
Description
This PR extends mater’s content extraction capabilities to support .car files that store UnixFS DAGs. Previously, extraction was limited to sequentially ordered blocks containing arbitrary content, where blocks were assumed to be in the correct order. With this change, mater can now correctly traverse and extract content from files wrapped in UnixFS DAG structures, as used in IPFS.
The implementation introduces a new code branch in the filestore creation and extraction paths. Specifically:
Config
enum has been updated with additional constructors -balanced_unixfs(chunk_size, tree_width)
, for UnixFS-wrapped content (default behavior for IPFS compatibility) andbalanced_raw(chunk_size, tree_width)
for direct/raw storage without UnixFS metadata. These allow users to specify whether to process content as UnixFS or in raw mode. The CLI has been updated with new flags (raw
,chunk_size
, andtree_width
) to control behavior. Several unit tests have been updated. Inline documentation has been updated to explain the logic and changes.Checklist
Open questions (aside from the ones raised in the checklist)
Not sure about whether to remove the
overwrite
field onConfig
, thoughts?